package com.bd08.flink.demo.jing

import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation}

object CoGroupDemo extends App {
  val benv = ExecutionEnvironment.getExecutionEnvironment
  val ds1=benv.fromCollection(List((1,1),(2,2),(1,3),(2,4)))
  val ds2=benv.fromCollection(List((1,5),(2,6),(1,7),(2,8)))
  ds1.coGroup(ds2).where(0).equalTo(0).apply((left,right)=>{
    println("------------------")
    print("left group:")
    left.foreach(print)
    print("right group:")
    right.foreach(print)
    println()
  }).collect()


}
